package flink.examples;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.111.128", 7000).map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> waterSensorStringKeyedStream = map.keyBy(r -> r.getId());
        WindowedStream<WaterSensor, String, TimeWindow> sensorWs = waterSensorStringKeyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> process = sensorWs.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                                                                          @Override
                                                                          public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                                                              long count = elements.spliterator().estimateSize();
                                                                              long windowStartTs = context.window().getStart();
                                                                              long windowEndTs = context.window().getEnd();
                                                                              String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                                                              String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                                                              out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包 含 " + count + " 条 数 据 ===>" + elements.toString());
                                                                          }
                                                                      }
        );
        process.print();
        env.execute();
    }
}
